package scales.utils.io
import scales.utils._
import resources._
import scalaz.{IterV, Enumerator, Input}
import scalaz.IterV._
import java.io._
import java.nio.channels._
import java.nio.ByteBuffer
sealed trait DataChunkEvidence[T]
object DataChunkEvidence {
implicit val justDataChunk: DataChunkEvidence[DataChunk] =
new DataChunkEvidence[DataChunk]{}
}
object ReadableByteChannelWrapper {
val emptyBytes = Array.ofDim[ Byte ](0)
}
trait DataChunker[T] extends CloseOnNeed {
def underlyingClosed = false
protected implicit val ev: DataChunkEvidence[T]
def nextChunk: DataChunk
}
class ReadableByteChannelWrapperBase[T](val channel: ReadableByteChannel, protected val buffer: ByteBuffer, val closeChannel: Boolean = true, protected val backingArray: Array[Byte] = ReadableByteChannelWrapper.emptyBytes )(implicit val ev: DataChunkEvidence[T]) extends DataChunker[T] with CloseOnNeed {
if (!buffer.hasArray) {
require(
backingArray.length > 0,
"A ReadableByteChannelWrapper with a Direct buffer must be created with a non empty backingArray")
}
override def underlyingClosed = !channel.isOpen
protected def doClose = {
if (closeChannel) {
channel.close()
}
}
protected def jbytes() : DataChunk = {
buffer.clear()
val read = channel.read(buffer)
read match {
case -1 => {
closeResource
EOFData
}
case 0 => EmptyData
case _ => Chunk(buffer.array, 0, read)
}
}
private var leftInBuffer = 0
protected def direct() : DataChunk = {
if (leftInBuffer > 0) {
val used = math.min(leftInBuffer, backingArray.length)
if (leftInBuffer >= used) {
leftInBuffer = leftInBuffer - used
} else {
leftInBuffer = 0
}
buffer.get(backingArray, 0, used)
Chunk(backingArray, 0, used)
} else {
buffer.clear()
val read = channel.read(buffer)
val rem = buffer.remaining
read match {
case -1 =>
closeResource
EOFData
case 0 => EmptyData
case _ =>
val used = math.min(math.min(rem, read), backingArray.length)
if (read > used) {
leftInBuffer = read - used
}
buffer.rewind()
buffer.get(backingArray, 0, used)
Chunk(backingArray, 0, used)
}
}
}
final def nextChunk: DataChunk =
if (buffer.hasArray)
jbytes()
else
direct()
}
class ReadableByteChannelWrapper[T](channel: ReadableByteChannel, closeChannel: Boolean = true, private val bytePool: Pool[ByteBuffer] = DefaultBufferPool, private val directBufferArrayPool: Pool[Array[Byte]] = DefaultByteArrayPool )(implicit ev: DataChunkEvidence[T]) extends {
override protected val buffer: ByteBuffer = bytePool.grab
override protected val backingArray: Array[Byte] =
if (buffer.hasArray)
ReadableByteChannelWrapper.emptyBytes
else
directBufferArrayPool.grab
} with
ReadableByteChannelWrapperBase[T](
channel, buffer, closeChannel, backingArray ) {
override protected def doClose = {
if (!buffer.hasArray) {
directBufferArrayPool.giveBack(backingArray)
}
bytePool.giveBack(buffer)
super.doClose
}
}
class RBCImplicitWrapper(channel: ReadableByteChannel)(implicit ev: DataChunkEvidence[DataChunk]){
def wrapped: DataChunker[DataChunk] = new ReadableByteChannelWrapper(channel)
}
trait ReadableByteChannelWrapperImplicits {
implicit def toRBCWrapper(channel: ReadableByteChannel)(implicit ev: DataChunkEvidence[DataChunk]): RBCImplicitWrapper = new RBCImplicitWrapper(channel)
implicit def dataChunkerEnumerator[T[_] <: DataChunker[_]]: Enumerator[T] =
new AsyncDataChunkerEnumerator[T]()
val INFINITE_RETRIES = -1
class AsyncDataChunkerEnumerator[T[_] <: DataChunker[_]]( contOnCont: Int = 5 ) extends Enumerator[T] {
def apply[E,A](chunker: T[E], i: IterV[E,A]): IterV[E, A] = {
def apply(chunker: T[E], i: IterV[E,A], count: Int): IterV[E, A] = {
i match {
case _ if chunker.underlyingClosed || chunker.isClosed => i
case Done(acc, input) => i
case Cont(k) =>
val realChunk = chunker.nextChunk
val nextChunk = realChunk.asInstanceOf[E]
val nextI =
if (realChunk.isEOF) {
k(IterV.EOF[E])
} else
if (realChunk.isEmpty)
k(IterV.Empty[E])
else
k(El(nextChunk))
val nc =
if (realChunk.isEmpty && !isDone(nextI)) {
count + 1
} else 0
if ((contOnCont != INFINITE_RETRIES) && (nc > contOnCont)) {
nextI
} else
apply(chunker, nextI, nc)
}
}
apply(chunker, i, 0)
}
}
}